Kafka实现动态订阅topic | 您所在的位置:网站首页 › flink source kafka offset › Kafka实现动态订阅topic |
1、组件和依赖 采用spring-kafka包 org.springframework.kafka spring-kafka 2.5.3.RELEASE2、配置类 @Bean("aiKafkaListenerFactory") public KafkaListenerContainerFactory kafkaListenerContainerFactory() { Map props = new HashMap(5); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); //轮询时间配置 props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 12000); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer())); return factory; }
3、消费类 @KafkaListener(topicPattern = "{topic前缀}.*", containerFactory = "aiKafkaListenerFactory") public void onMessage(ConsumerRecord record) { log.info("kafka消费内容:topic=" + record.topic() + ";content=" + record.value()); try { //处理业务逻辑 } catch (Exception e) { e.printStackTrace(); log.error("kafka消费失败:" + record.value()); } }
注意: 1、topicPattern后面为正则表达式,凡是匹配该正则的都可以消费 2、原理为spring定时轮询topic列表,符合条件的重新订阅,轮询时间配置项为ConsumerConfig.METADATA_MAX_AGE_CONFIG |
CopyRight 2018-2019 实验室设备网 版权所有 |